<?php

namespace app;

use RdKafka\Producer;

class KafkaHelper
{
    //使用前请安装rdkafka扩展，依赖相关composer包："kwn/php-rdkafka-stubs": "^1.3"（仅用作IDE提示作为，框架运行时不会自动加载相关文件的）
    private static $instance;
    private static $brokers;
    private static $topic;
    private static $producerTopic;
    private static $partition = 0; //默认只使用一个分区，0分区

    public static function instance(string $brokers, string $topic = 'common')
    {
        if (is_null(self::$brokers) || self::$brokers != $brokers) {
            self::$brokers = $brokers;
            self::$producerTopic = null;
            self::$instance = null;
        }
        if (is_null(self::$topic) || self::$topic != $topic) {
            self::$topic = $topic;
            self::$producerTopic = null;
            self::$instance = null;
        }
        if (is_null(self::$instance)) {
            self::$instance = new static();
        }
        return self::$instance;
    }

    /**
     * 获取生产者topic
     * Author:我只想看看蓝天<1207032539@qq.com>
     * @return \RdKafka\ProducerTopic
     */
    private function getProducerTopic()
    {
        if (is_null(self::$producerTopic)) {
            //new一个生产者
            $kafka = new Producer();
            //设置日志等级
//            $kafka->setLogLevel(LOG_DEBUG);
            //设置代理
            $kafka->addBrokers(self::$brokers);
            //设置topic
            self::$producerTopic = $kafka->newTopic(self::$topic);
        }
        return self::$producerTopic;
    }

    /**
     * 生产消息
     * Author:我只想看看蓝天<1207032539@qq.com>
     * @param $message //消息体
     * @param string $key //消息key
     */
    public function produce($message)
    {
        $message = serialize($message);
        //发布消息
        $this->getProducerTopic()->produce(self::$partition, 0, $message);
    }

}
